package com.shujia.flink.table

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Demo5MysqlSource {
  def main(args: Array[String]): Unit = {

    //创建flink 环境
    val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    bsEnv.setParallelism(1)

    //设置table 环境的一些参数
    val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner() //使用blikc计划器
      .inStreamingMode() //流模式
      .build()

    // 创建flink  table 环境
    val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)


    bsTableEnv.executeSql(
      """
        |CREATE TABLE student (
        |  id STRING,
        |  name STRING,
        |  age INT,
        |  gender STRING,
        |  clazz STRING
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://master:3306/bigdata',
        |   'table-name' = 'student',
        |   'username' = 'root',
        |   'password' = '123456'
        |)
        |
      """.stripMargin)

    bsTableEnv.executeSql(
      """
        |CREATE TABLE print_table WITH ('connector' = 'print')
        |LIKE student (EXCLUDING ALL)
        |
      """.stripMargin)

    bsTableEnv.executeSql(
      """
        |insert into print_table
        |select * from student
        |
      """.stripMargin)
  }

}
